# mypy: disable-error-code=name-defined
"""Amazon OpenSearch Write Module (PRIVATE)."""

from __future__ import annotations

import ast
import json
import logging
from typing import TYPE_CHECKING, Any, Generator, Iterable, Mapping, cast

import boto3
import numpy as np
from pandas import notna

import awswrangler.pandas as pd
from awswrangler import _utils, exceptions
from awswrangler._utils import parse_path
from awswrangler.opensearch._utils import _get_distribution, _get_version_major, _is_serverless

if TYPE_CHECKING:
    try:
        import jsonpath_ng
    except ImportError:
        pass
else:
    jsonpath_ng = _utils.import_optional_dependency("jsonpath_ng")


if TYPE_CHECKING:
    try:
        import opensearchpy
    except ImportError:
        pass
else:
    opensearchpy = _utils.import_optional_dependency("opensearchpy")

if TYPE_CHECKING:
    try:
        import progressbar
    except ImportError:
        pass
else:
    progressbar = _utils.import_optional_dependency("progressbar")


_logger: logging.Logger = logging.getLogger(__name__)

_DEFAULT_REFRESH_INTERVAL = "1s"


def _selected_keys(document: Mapping[str, Any], keys_to_write: list[str] | None) -> Mapping[str, Any]:
    if keys_to_write is None:
        keys_to_write = list(document.keys())
    keys_to_write = list(filter(lambda x: x != "_id", keys_to_write))
    return {key: document[key] for key in keys_to_write}


def _actions_generator(
    documents: Iterable[dict[str, Any]] | Iterable[Mapping[str, Any]],
    index: str,
    doc_type: str | None,
    keys_to_write: list[str] | None,
    id_keys: list[str] | None,
    bulk_size: int = 10000,
) -> Generator[list[dict[str, Any]], None, None]:
    bulk_chunk_documents = []
    for i, document in enumerate(documents):
        if id_keys:
            _id = "-".join([str(document[id_key]) for id_key in id_keys])
        else:
            _id = cast(str, document.get("_id"))
        bulk_chunk_documents.append(
            {
                "_index": index,
                "_type": doc_type,
                "_id": _id,
                "_source": _selected_keys(document, keys_to_write),
            }
        )
        if (i + 1) % bulk_size == 0:
            yield bulk_chunk_documents
            bulk_chunk_documents = []
    if len(bulk_chunk_documents) > 0:
        yield bulk_chunk_documents


def _df_doc_generator(df: pd.DataFrame) -> Generator[dict[str, Any], None, None]:
    def _deserialize(v: Any) -> Any:
        if isinstance(v, str):
            v = v.strip()
            if v.startswith("{") and v.endswith("}") or v.startswith("[") and v.endswith("]"):
                try:
                    v = json.loads(v)
                except json.decoder.JSONDecodeError:
                    try:
                        v = ast.literal_eval(v)  # if properties are enclosed with single quotes
                        if not isinstance(v, dict):
                            _logger.warning("could not convert string to json: %s", v)
                    except SyntaxError as e:
                        _logger.warning("could not convert string to json: %s", v)
                        _logger.warning(e)
        return v

    df_iter = df.iterrows()
    for _, document in df_iter:
        yield {k: _deserialize(v) for k, v in document.items() if np.array(notna(v)).any()}


def _file_line_generator(path: str, is_json: bool = False) -> Generator[Any, None, None]:
    with open(path) as fp:
        for line in fp:
            if is_json:
                yield json.loads(line)
            else:
                yield line.strip()


@_utils.check_optional_dependency(jsonpath_ng, "jsonpath_ng")
def _get_documents_w_json_path(documents: list[Mapping[str, Any]], json_path: str) -> list[Any]:
    from jsonpath_ng.exceptions import JsonPathParserError  # noqa: PLC0415

    try:
        jsonpath_expression = jsonpath_ng.parse(json_path)
    except JsonPathParserError as e:
        _logger.error("invalid json_path: %s", json_path)
        raise e
    output_documents = []
    for doc in documents:
        for match in jsonpath_expression.find(doc):
            match_value = match.value
            if isinstance(match_value, list):
                output_documents += match_value
            elif isinstance(match_value, dict):
                output_documents.append(match_value)
            else:
                msg = f"expected json_path value to be a list/dict. received type {type(match_value)} ({match_value})"
                raise ValueError(msg)
    return output_documents


def _get_refresh_interval(client: "opensearchpy.OpenSearch", index: str) -> Any:
    url = f"/{index}/_settings"
    try:
        response = client.transport.perform_request("GET", url)
        index_settings = response.get(index, {}).get("index", {})
        refresh_interval = index_settings.get("refresh_interval", _DEFAULT_REFRESH_INTERVAL)
        return refresh_interval
    except opensearchpy.exceptions.NotFoundError:
        return _DEFAULT_REFRESH_INTERVAL


def _set_refresh_interval(client: "opensearchpy.OpenSearch", index: str, refresh_interval: Any | None) -> Any:
    url = f"/{index}/_settings"
    body = {"index": {"refresh_interval": refresh_interval}}
    try:
        return client.transport.perform_request("PUT", url, headers={"content-type": "application/json"}, body=body)
    except opensearchpy.exceptions.RequestError:
        return None


def _disable_refresh_interval(
    client: "opensearchpy.OpenSearch",
    index: str,
) -> Any:
    return _set_refresh_interval(client=client, index=index, refresh_interval="-1")


@_utils.check_optional_dependency(opensearchpy, "opensearchpy")
def create_index(
    client: "opensearchpy.OpenSearch",
    index: str,
    doc_type: str | None = None,
    settings: dict[str, Any] | None = None,
    mappings: dict[str, Any] | None = None,
) -> dict[str, Any]:
    """Create an index.

    Parameters
    ----------
    client
        instance of opensearchpy.OpenSearch to use.
    index
        Name of the index.
    doc_type
        Name of the document type (for Elasticsearch versions 5.x and earlier).
    settings
        Index settings
        https://opensearch.org/docs/opensearch/rest-api/create-index/#index-settings
    mappings
        Index mappings
        https://opensearch.org/docs/opensearch/rest-api/create-index/#mappings

    Returns
    -------
        OpenSearch rest api response
        https://opensearch.org/docs/opensearch/rest-api/create-index/#response.

    Examples
    --------
    Creating an index.

    >>> import awswrangler as wr
    >>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT')
    >>> response = wr.opensearch.create_index(
    ...     client=client,
    ...     index="sample-index1",
    ...     mappings={
    ...        "properties": {
    ...          "age":  { "type" : "integer" }
    ...        }
    ...     },
    ...     settings={
    ...         "index": {
    ...             "number_of_shards": 2,
    ...             "number_of_replicas": 1
    ...          }
    ...     }
    ... )

    """
    body = {}
    if mappings:
        if _get_distribution(client) == "opensearch" or _get_version_major(client) >= 7:
            body["mappings"] = mappings  # doc type deprecated
        elif doc_type:
            body["mappings"] = {doc_type: mappings}
        else:
            body["mappings"] = {index: mappings}
    if settings:
        body["settings"] = settings
    if not body:
        body = None  # type: ignore[assignment]

    # ignore 400 cause by IndexAlreadyExistsException when creating an index
    response: dict[str, Any] = client.indices.create(index=index, body=body, ignore=400)
    if "error" in response:
        _logger.warning(response)
        if str(response["error"]).startswith("MapperParsingException"):
            raise ValueError(response["error"])
    return response


@_utils.check_optional_dependency(opensearchpy, "opensearchpy")
def delete_index(client: "opensearchpy.OpenSearch", index: str) -> dict[str, Any]:
    """Delete an index.

    Parameters
    ----------
    client
        instance of opensearchpy.OpenSearch to use.
    index
        Name of the index.

    Returns
    -------
        OpenSearch rest api response

    Examples
    --------
    Deleting an index.

    >>> import awswrangler as wr
    >>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT')
    >>> response = wr.opensearch.delete_index(
    ...     client=client,
    ...     index="sample-index1"
    ... )

    """
    # ignore 400/404 IndexNotFoundError exception
    response: dict[str, Any] = client.indices.delete(index=index, ignore=[400, 404])
    if "error" in response:
        _logger.warning(response)
    return response


@_utils.check_optional_dependency(opensearchpy, "opensearchpy")
def index_json(
    client: "opensearchpy.OpenSearch",
    path: str,
    index: str,
    doc_type: str | None = None,
    boto3_session: boto3.Session | None = None,
    json_path: str | None = None,
    use_threads: bool | int = False,
    **kwargs: Any,
) -> Any:
    """Index all documents from JSON file to OpenSearch index.

    The JSON file should be in a JSON-Lines text format (newline-delimited JSON) - https://jsonlines.org/
    OR if the is a single large JSON please provide `json_path`.

    Parameters
    ----------
    client
        instance of opensearchpy.OpenSearch to use.
    path
        s3 or local path to the JSON file which contains the documents.
    index
        Name of the index.
    doc_type
        Name of the document type (for Elasticsearch versions 5.x and earlier).
    json_path
        JsonPath expression to specify explicit path to a single name element
        in a JSON hierarchical data structure.
        Read more about `JsonPath <https://jsonpath.com>`_
    boto3_session
        Boto3 Session to be used to access S3 if **path** is provided.
        The default boto3 session will be used if **boto3_session** is ``None``.
    use_threads
        True to enable concurrent requests, False to disable multiple threads.
        If enabled os.cpu_count() will be used as the max number of threads.
        If integer is provided, specified number is used.
    **kwargs
        KEYWORD arguments forwarded to :func:`~awswrangler.opensearch.index_documents`
        which is used to execute the operation

    Returns
    -------
        Response payload
        https://opensearch.org/docs/opensearch/rest-api/document-apis/bulk/#response.

    Examples
    --------
    Writing contents of JSON file

    >>> import awswrangler as wr
    >>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT')
    >>> wr.opensearch.index_json(
    ...     client=client,
    ...     path='docs.json',
    ...     index='sample-index1'
    ... )
    """
    _logger.debug("indexing %s from %s", index, path)

    if path.startswith("s3://"):
        bucket, key = parse_path(path)
        s3 = _utils.client(service_name="s3", session=boto3_session)
        obj = s3.get_object(Bucket=bucket, Key=key)
        body = obj["Body"].read()
        lines = body.splitlines()
        documents = [json.loads(line) for line in lines]
        if json_path:
            documents = _get_documents_w_json_path(documents, json_path)
    else:  # local path
        documents = list(_file_line_generator(path, is_json=True))
        if json_path:
            documents = _get_documents_w_json_path(documents, json_path)
    return index_documents(
        client=client, documents=documents, index=index, doc_type=doc_type, use_threads=use_threads, **kwargs
    )


@_utils.check_optional_dependency(opensearchpy, "opensearchpy")
def index_csv(
    client: "opensearchpy.OpenSearch",
    path: str,
    index: str,
    doc_type: str | None = None,
    pandas_kwargs: dict[str, Any] | None = None,
    use_threads: bool | int = False,
    **kwargs: Any,
) -> Any:
    """Index all documents from a CSV file to OpenSearch index.

    Parameters
    ----------
    client
        instance of opensearchpy.OpenSearch to use.
    path
        S3 or local path to the CSV file which contains the documents.
    index
        Name of the index.
    doc_type
        Name of the document type (for Elasticsearch versions 5.x and earlier).
    pandas_kwargs
        Dictionary of arguments forwarded to pandas.read_csv().
        e.g. pandas_kwargs={'sep': '|', 'na_values': ['null', 'none']}
        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
        Note: these params values are enforced: `skip_blank_lines=True`
    use_threads
        True to enable concurrent requests, False to disable multiple threads.
        If enabled os.cpu_count() will be used as the max number of threads.
        If integer is provided, specified number is used.
    **kwargs
        KEYWORD arguments forwarded to :func:`~awswrangler.opensearch.index_documents`
        which is used to execute the operation

    Returns
    -------
        Response payload
        https://opensearch.org/docs/opensearch/rest-api/document-apis/bulk/#response.

    Examples
    --------
    Writing contents of CSV file

    >>> import awswrangler as wr
    >>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT')
    >>> wr.opensearch.index_csv(
    ...     client=client,
    ...     path='docs.csv',
    ...     index='sample-index1'
    ... )

    Writing contents of CSV file using pandas_kwargs

    >>> import awswrangler as wr
    >>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT')
    >>> wr.opensearch.index_csv(
    ...     client=client,
    ...     path='docs.csv',
    ...     index='sample-index1',
    ...     pandas_kwargs={'sep': '|', 'na_values': ['null', 'none']}
    ... )
    """
    _logger.debug("indexing %s from %s", index, path)
    if pandas_kwargs is None:
        pandas_kwargs = {}
    enforced_pandas_params = {
        "skip_blank_lines": True,
        # 'na_filter': True  # will generate Nan value for empty cells. We remove Nan keys in _df_doc_generator
        # Note: if the user will pass na_filter=False null fields will be indexed as well ({"k1": null, "k2": null})
    }
    pandas_kwargs.update(enforced_pandas_params)
    df = pd.read_csv(path, **pandas_kwargs)
    return index_df(client, df=df, index=index, doc_type=doc_type, use_threads=use_threads, **kwargs)


@_utils.check_optional_dependency(opensearchpy, "opensearchpy")
def index_df(
    client: "opensearchpy.OpenSearch",
    df: pd.DataFrame,
    index: str,
    doc_type: str | None = None,
    use_threads: bool | int = False,
    **kwargs: Any,
) -> Any:
    """Index all documents from a DataFrame to OpenSearch index.

    Parameters
    ----------
    client
        instance of opensearchpy.OpenSearch to use.
    df
        `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
    index
        Name of the index.
    doc_type
        Name of the document type (for Elasticsearch versions 5.x and earlier).
    use_threads
        True to enable concurrent requests, False to disable multiple threads.
        If enabled os.cpu_count() will be used as the max number of threads.
        If integer is provided, specified number is used.
    **kwargs
        KEYWORD arguments forwarded to :func:`~awswrangler.opensearch.index_documents`
        which is used to execute the operation

    Returns
    -------
        Response payload
        https://opensearch.org/docs/opensearch/rest-api/document-apis/bulk/#response.

    Examples
    --------
    Writing rows of DataFrame

    >>> import awswrangler as wr
    >>> import pandas as pd
    >>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT')
    >>> wr.opensearch.index_df(
    ...     client=client,
    ...     df=pd.DataFrame([{'_id': '1'}, {'_id': '2'}, {'_id': '3'}]),
    ...     index='sample-index1',
    ... )
    """
    return index_documents(
        client=client,
        documents=_df_doc_generator(df),
        index=index,
        doc_type=doc_type,
        use_threads=use_threads,
        **kwargs,
    )


@_utils.check_optional_dependency(opensearchpy, "opensearchpy")
def index_documents(
    client: "opensearchpy.OpenSearch",
    documents: Iterable[Mapping[str, Any]],
    index: str,
    doc_type: str | None = None,
    keys_to_write: list[str] | None = None,
    id_keys: list[str] | None = None,
    ignore_status: list[Any] | tuple[Any] | None = None,
    bulk_size: int = 1000,
    chunk_size: int | None = 500,
    max_chunk_bytes: int | None = 100 * 1024 * 1024,
    max_retries: int | None = None,
    initial_backoff: int | None = None,
    max_backoff: int | None = None,
    use_threads: bool | int = False,
    enable_refresh_interval: bool = True,
    **kwargs: Any,
) -> dict[str, Any]:
    """
    Index all documents to OpenSearch index.

    Note
    ----
    `max_retries`, `initial_backoff`, and `max_backoff` are not supported with parallel bulk
    (when `use_threads` is set to True).

    Note
    ----
    Some of the args are referenced from opensearch-py client library (bulk helpers)
    https://opensearch-py.readthedocs.io/en/latest/helpers.html#opensearchpy.helpers.bulk
    https://opensearch-py.readthedocs.io/en/latest/helpers.html#opensearchpy.helpers.streaming_bulk

    If you receive `Error 429 (Too Many Requests) /_bulk` please to to decrease `bulk_size` value.
    Please also consider modifying the cluster size and instance type -
    Read more here: https://aws.amazon.com/premiumsupport/knowledge-center/resolve-429-error-es/

    Parameters
    ----------
    client
        instance of opensearchpy.OpenSearch to use.
    documents
        List which contains the documents that will be inserted.
    index
        Name of the index.
    doc_type
        Name of the document type (for Elasticsearch versions 5.x and earlier).
    keys_to_write
        list of keys to index. If not provided all keys will be indexed
    id_keys
        list of keys that compound document unique id. If not provided will use `_id` key if exists,
        otherwise will generate unique identifier for each document.
    ignore_status
        list of HTTP status codes that you want to ignore (not raising an exception)
    bulk_size
        number of docs in each _bulk request (default: 1000)
    chunk_size
        number of docs in one chunk sent to es (default: 500)
    max_chunk_bytes
        the maximum size of the request in bytes (default: 100MB)
    max_retries
        maximum number of times a document will be retried when
        ``429`` is received, set to 0 (default) for no retries on ``429`` (default: 2)
    initial_backoff
        number of seconds we should wait before the first retry.
        Any subsequent retries will be powers of ``initial_backoff*2**retry_number`` (default: 2)
    max_backoff
        maximum number of seconds a retry will wait (default: 600)
    use_threads
        True to enable concurrent requests, False to disable multiple threads.
        If enabled os.cpu_count() will be used as the max number of threads.
        If integer is provided, specified number is used.
    enable_refresh_interval
        True (default) to enable ``refresh_interval`` modification to ``-1`` (disabled) while indexing documents
    **kwargs
        KEYWORD arguments forwarded to bulk operation
        elasticsearch >= 7.10.2 / opensearch: \
https://opensearch.org/docs/opensearch/rest-api/document-apis/bulk/#url-parameters
        elasticsearch < 7.10.2: \
https://opendistro.github.io/for-elasticsearch-docs/docs/elasticsearch/rest-api-reference/#url-parameters

    Returns
    -------
        Response payload
        https://opensearch.org/docs/opensearch/rest-api/document-apis/bulk/#response.

    Examples
    --------
    Writing documents

    >>> import awswrangler as wr
    >>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT')
    >>> wr.opensearch.index_documents(
    ...     documents=[{'_id': '1', 'value': 'foo'}, {'_id': '2', 'value': 'bar'}],
    ...     index='sample-index1'
    ... )
    """
    if "refresh" in kwargs and _is_serverless(client):
        raise exceptions.NotSupported("Refresh policy not supported in OpenSearch Serverless.")

    if use_threads and any([max_retries, initial_backoff, max_backoff]):
        raise exceptions.InvalidArgumentCombination(
            "`max_retries`, `initial_backoff`, and `max_backoff` are not supported when `use_threads` is set to True"
        )

    if not isinstance(documents, list):
        documents = list(documents)
    total_documents = len(documents)
    _logger.debug("indexing %s documents into %s", total_documents, index)

    actions = _actions_generator(
        documents, index, doc_type, keys_to_write=keys_to_write, id_keys=id_keys, bulk_size=bulk_size
    )

    success = 0
    errors: list[Any] = []
    refresh_interval = None
    try:
        if progressbar:
            widgets = [
                progressbar.Percentage(),  # type: ignore[no-untyped-call]
                progressbar.SimpleProgress(format=" (%(value_s)s/%(max_value_s)s)"),  # type: ignore[no-untyped-call]
                progressbar.Bar(),  # type: ignore[no-untyped-call]
                progressbar.Timer(),  # type: ignore[no-untyped-call]
            ]
            progress_bar = progressbar.ProgressBar(
                widgets=widgets, max_value=total_documents, prefix="Indexing: "
            ).start()
        for i, bulk_chunk_documents in enumerate(actions):
            if i == 1 and enable_refresh_interval:  # second bulk iteration, in case the index didn't exist before
                refresh_interval = _get_refresh_interval(client, index)
                _disable_refresh_interval(client, index)
            _logger.debug("running bulk index of %s documents", len(bulk_chunk_documents))
            bulk_kwargs = {
                "ignore_status": ignore_status,
                "chunk_size": chunk_size,
                "max_chunk_bytes": max_chunk_bytes,
                "request_timeout": 30,
                **kwargs,
            }
            _logger.debug("running bulk with kwargs: %s", bulk_kwargs)
            if use_threads:
                # Parallel bulk does not support max_retries, initial_backoff & max_backoff
                for _success, _errors in opensearchpy.helpers.parallel_bulk(
                    client, bulk_chunk_documents, **bulk_kwargs
                ):
                    success += _success
                    errors += _errors
            else:
                # Defaults
                bulk_kwargs["max_retries"] = 5 if not max_retries else max_retries
                bulk_kwargs["initial_backoff"] = 2 if not initial_backoff else initial_backoff
                bulk_kwargs["max_backoff"] = 600 if not max_backoff else max_backoff

                _success, _errors = opensearchpy.helpers.bulk(client, bulk_chunk_documents, **bulk_kwargs)
                success += _success
                errors += _errors
            _logger.debug("indexed %s documents (%s/%s)", _success, success, total_documents)
            if progressbar:
                progress_bar.update(success, force=True)
    except opensearchpy.TransportError as e:
        if str(e.status_code) == "429":  # Too Many Requests
            _logger.error(
                "Error 429 (Too Many Requests):"
                "Try to tune bulk_size parameter."
                "Read more here: https://aws.amazon.com/premiumsupport/knowledge-center/resolve-429-error-es"
            )
            raise e

    finally:
        if enable_refresh_interval:
            _set_refresh_interval(client, index, refresh_interval)

    return {"success": success, "errors": errors}
